AWS Glue の Dynamicframeを使わずDataframeを用いて自在にCSV/TSVファイルを出力する

AWS Glue の Dynamicframeを使わずDataframeを用いて自在にCSV/TSVファイルを出力する

Clock Icon2018.04.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

Dynamicframeは、フォーマットにCSVファイルを指定できますが、空白を含む文字列は自動的にダブルクォーテーションで括られてしまったり、デリミタ文字列はカンマのみとなります。今回はこの課題を解決するため、Dynamicframeを使わずDataframeを用いて自在にCSV/TSVファイル出力する方法をご紹介します。

DynamicframeによるCSVファイル出力の課題

空白を含む文字列は自動的にダブルクォーテーションで括られてしまう

例えば、一般的なISOフォーマットのTIMESTAMP形式2018-04:30 12:34:56のように空白が含まれている文字列をDynamicframeでCSVファイル出力すると、"2018-04:30 12:34:56"のように自動的にダブルクォーテーションで括られてしまいます。ダブルクォーテーションで括られたTIMESTAMP形式の文字列は、Amazon Redshift SpectrumやAmazon AthenaでTIMESTAMP型のデータとして自動認識できません。

つまり、Dynamicframeを用いると、データソースのCSVファイルのTIMESTAMP型のデータが、Amazon Redshift SpectrumやAmazon AthenaでTIMESTAMP型のデータとして認識できないCSVファイルフォーマットで出力されてしまいます。

デリミタ文字列はカンマのみ

Dynamicframeの出力オプションは、formatの指定できますが、formatにCSVを指定するとデリミタ文字列は、カンマのみとなります。つまり、タブ文字など任意の文字によって区切ることはできません。

つまり、Dynamicframeを用いると、ソースデータのTSVファイルからTSVファイルのターゲットデータを生成できません。

Dataframeを用いて自在にCSV/TSVファイル出力する

カラムデータは、yyyy-MM-dd HH:mm:ss形式のTIMESTAMPの文字列を生成して出力したとします。

Dataframeを用いたCSVファイルをgzip出力する方法

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://mybucket/posdata", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")

生成される上記のコードの代わりに、

df_out = applymapping1.toDF()
df_out.write.mode("append").option("sep", ",").csv("s3://mybucket/posdata", compression="gzip")

のようにDataframeを用いて出力します。

  • 1行目は、toDF()関数を用いて、Dynamicframe から Dataframe に変換しています。変換してしまえば後は、Apache Spark 標準の DataframeのAPIを利用して出力できます
  • 2行目は、セパレータ文字列にカンマを指定(option("sep", ","))と、データのGZIP圧縮を指定(compression="gzip")して出力します。

Dataframeを用いてCSVファイル出力すると、TIMESTAMP形式2018-04:30 12:34:56がダブルクォーテーションで括られず、そのまま出力できます。

なお、上記の方法で、TIMESTAMP型をそのまま出力すると、ISO/W3Cフォーマット(例. 2016-04-30T12:34:56.000Z)で出力されます。

Dataframeを用いたTSVファイルをgzip出力する方法

セパレータ文字列にタブを指定(option("sep", "\t"))して出力します。それ以外はCSVファイル出力と同様です。

df_out = applymapping1.toDF()
df_out.write.mode("append").option("sep", "\t").csv("s3://mybucket/posdata", compression="gzip")

最後に

公式のチュートリアルやドキュメントのサンプルは、CSVファイルをParquetファイル(カラムナファイルフォーマット)に変換する例がほとんどで、CSVからCSVやTSVからTSVに変換する例はありませんでした。そんなときは、Sparkが提供するDataframeでワークアラウンドがないか検討すると良いでしょう。しかし、Spark初心者が安易にDataframeでなんでも済ませようというのはお薦めしません。AWS Glueが提供するDynamicFrameは、とても良くできたフレームワークであり、Sparkの知見がないエンジニアでも容易にETLコードを安全に書くことができますので、DynamicFrameでできることは出来る限り、DynamicFrameを利用することをお薦めします。そして、将来的にはDataFrameを使わず、DynamicFrameのみでETLコードが書けるようになることを期待しています。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.